package com.laosg.springboot.study.nsq.consumer.service.impl;

import cn.hutool.json.JSONUtil;
import com.github.brainlag.nsq.NSQConfig;
import com.github.brainlag.nsq.NSQConsumer;
import com.github.brainlag.nsq.NSQProducer;
import com.github.brainlag.nsq.exceptions.NSQException;
import com.github.brainlag.nsq.lookup.DefaultNSQLookup;
import com.laosg.springboot.study.nsq.consumer.config.properties.NsqProperties;
import com.laosg.springboot.study.nsq.consumer.model.NsqMessage;
import com.laosg.springboot.study.nsq.consumer.service.MessageService;
import com.sproutsocial.nsq.Publisher;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import java.time.LocalDateTime;
import java.util.UUID;
import java.util.concurrent.TimeoutException;

/**
 * Created by kaimin on 29/4/2019.
 * time : 10:16
 */
@Service
@Slf4j
public class MessageServiceImpl implements MessageService {

    private NSQProducer nsqProducer;

    @Autowired
    private Publisher publisher;

    @Autowired
    private NsqProperties nsqProperties;
    @Override
    public String sentMessage(String body) {
        try {
            NsqMessage nsqMessage = new NsqMessage();
            nsqMessage.setId(UUID.randomUUID().getLeastSignificantBits());
            nsqMessage.setAction("testChannel");
            nsqMessage.setBody(body);
            String msg = JSONUtil.toJsonStr(nsqMessage);
            //发只能发到一个具体的nsqd
            NSQConfig config=new NSQConfig();

            config.setMaxInFlight(2);
            nsqProducer.setConfig(config);//有两个nsqd

            nsqProducer.produce(nsqProperties.getTopic(), msg.getBytes());


            log.info("消息发送特定Channel成功!时间:{}", LocalDateTime.now());
            return "success";
        } catch (NSQException e) {
            log.error("nsq 连接异常!msg={}", e.getMessage());
            return "Error:" + e.getMessage();
        } catch (TimeoutException e) {
            log.error("nsq 发送消息超时!msg={}", e.getMessage());
            return "Error:" + e.getMessage();
        } catch (Exception e) {
            log.error("出现未知异常!", e);
            return "Error:" + e.getMessage();
        }
    }

    @Override
    public String sentMessage2(String body) {
        try {
            NsqMessage nsqMessage = new NsqMessage();
            nsqMessage.setId(UUID.randomUUID().getLeastSignificantBits());
            nsqMessage.setAction("Channel1");
            nsqMessage.setBody(body);
            String msg = JSONUtil.toJsonStr(nsqMessage);


            publisher.publish(nsqProperties.getTopic(),msg.getBytes());

            log.info("消息发送特定Channel成功!时间:{}", LocalDateTime.now());
            return "success";
        } catch (Exception e) {
            log.error("出现未知异常!", e);
            return "Error:" + e.getMessage();
        }
    }

    @Override
    public void consume() {
        DefaultNSQLookup nsqLookup = new DefaultNSQLookup();
        //加入 nsqLookupd的集群配置
        for (int i = 0; i < nsqProperties.getNsqlookup().getAddresses().size(); i++) {
            nsqLookup.addLookupAddress(nsqProperties.getNsqlookup().getAddresses().get(i),nsqProperties.getNsqlookup().getPorts().get(i));
        }
        NSQConfig config=new NSQConfig();

        config.setMaxInFlight(2);//因为多个nsqd 有想同的topic

        NSQConsumer nsqConsumer = new NSQConsumer(nsqLookup, nsqProperties.getTopic(), "testChannel", (message -> {
            if (message != null) {
                String msg = new String(message.getMessage());
                NsqMessage nsqMessage = null;
                try {
                    nsqMessage = JSONUtil.toBean(msg, NsqMessage.class);

                } catch (Exception e) {
                    log.error("消息无法转换，存在问题");
                    message.finished();
                    return;
                }
                if (!nsqMessage.getAction().equals("testChannel")) {
                    // 如果nsq消息体中的action不等于当前的chanel名称,说明不是当前消费者需要处理的数据,确认消费即可
                    message.finished();
                    return;
                }
                try {
                    log.info("消费特定的消息:{}", nsqMessage.getBody());
                    //确认消息
                    message.finished();
                    return;
                } catch (Exception e) {
                    //说明异常,重试下
                    message.requeue();

                }
                return;
            }
            message.finished();
            return;
        }),config);
        nsqConsumer.start();//启动一次就可以，
        log.info("nsq消费者启动成功");


    }

    @Override
    public void consume2() {
        DefaultNSQLookup nsqLookup = new DefaultNSQLookup();
        //加入 nsqLookupd的集群配置
        for (int i = 0; i < nsqProperties.getNsqlookup().getAddresses().size(); i++) {
            nsqLookup.addLookupAddress(nsqProperties.getNsqlookup().getAddresses().get(i),nsqProperties.getNsqlookup().getPorts().get(i));
        }
        NSQConsumer nsqConsumer = new NSQConsumer(nsqLookup, nsqProperties.getTopic(), "testChannel", (message -> {
            if (message != null) {
                String msg = new String(message.getMessage());
                NsqMessage nsqMessage = null;
                try {
                    nsqMessage = JSONUtil.toBean(msg, NsqMessage.class);

                } catch (Exception e) {
                    log.error("消息2无法转换，存在问题");
                    message.finished();
                    return;
                }
                if (!nsqMessage.getAction().equals("testChannel")) {
                    // 如果nsq消息体中的action不等于当前的chanel名称,说明不是当前消费者需要处理的数据,确认消费即可
                    message.finished();
                    return;
                }
                try {
                    log.info("消费2特定的消息:{}", nsqMessage.getBody());
                    //确认消息
                    message.finished();
                    return;
                } catch (Exception e) {
                    //说明异常,重试下
                    message.requeue();

                }
                return;
            }
            message.finished();
            return;
        }));
        nsqConsumer.start();//启动一次就可以，
        log.info("nsq消费者2启动成功");

    }
}
